{
  "nbformat": 4,
  "nbformat_minor": 0,
  "metadata": {
    "kernelspec": {
      "display_name": "Python 3",
      "language": "python",
      "name": "python3"
    },
    "language_info": {
      "codemirror_mode": {
        "name": "ipython",
        "version": 3
      },
      "file_extension": ".py",
      "mimetype": "text/x-python",
      "name": "python",
      "nbconvert_exporter": "python",
      "pygments_lexer": "ipython3",
      "version": "3.7.5rc1"
    },
    "colab": {
      "name": "template_beam.ipynb",
      "provenance": [],
      "collapsed_sections": []
    }
  },
  "cells": [
    {
      "cell_type": "code",
      "metadata": {
        "id": "WFgojohE42wu",
        "colab_type": "code",
        "colab": {}
      },
      "source": [
        "##### Copyright \\u0026copy; 2020 The TensorFlow Authors."
      ],
      "execution_count": 0,
      "outputs": []
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "4ftGcLFZ42wx",
        "colab_type": "code",
        "colab": {}
      },
      "source": [
        "#@title Licensed under the Apache License, Version 2.0 (the \\\"License\\\");\n",
        "# you may not use this file except in compliance with the License.\n",
        "# You may obtain a copy of the License at\n",
        "#\n",
        "# https://www.apache.org/licenses/LICENSE-2.0\n",
        "#\n",
        "# Unless required by applicable law or agreed to in writing, software\n",
        "# distributed under the License is distributed on an \\\"AS IS\\\" BASIS,\n",
        "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
        "# See the License for the specific language governing permissions and\n",
        "# limitations under the License."
      ],
      "execution_count": 0,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "HqZC7Cv942w0",
        "colab_type": "text"
      },
      "source": [
        "# Create a TFX pipeline using templates with Beam orchestrator"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "YRBoc5la42w0",
        "colab_type": "text"
      },
      "source": [
        "## Introduction\n",
        "\n",
        "This document will provide instructions to create a TensorFlow Extended (TFX) pipeline\n",
        "using *templates* which are provided with TFX Python package.\n",
        "Most of instructions are Linux shell commands, and corresponding\n",
        "Jupyter Notebook code cells which invoke those commands using `!` are provided.\n",
        "\n",
        "You will build a pipeline using [Taxi Trips dataset](\n",
        "https://data.cityofchicago.org/Transportation/Taxi-Trips/wrvz-psew)\n",
        "released by the City of Chicago. We strongly encourage you to try to build\n",
        "your own pipeline using your dataset by utilizing this pipeline as a baseline.\n",
        "\n",
        "We will build a pipeline using [Apache Beam Orchestrator](https://www.tensorflow.org/tfx/guide/beam_orchestrator). If you are interested in using Kubeflow orchestrator on Google Cloud, please see [TFX on Cloud AI Platform Pipelines tutorial](https://www.tensorflow.org/tfx/tutorials/tfx/cloud-ai-platform-pipelines).\n",
        "\n",
        "## Prerequisites\n",
        "\n",
        "* Linux / MacOS\n",
        "* Python >= 3.5.3\n",
        "\n",
        "You can get all prerequisites easily by [running this notebook on Google Colab](https://colab.sandbox.google.com/github/tensorflow/tfx/blob/master/docs/tutorials/tfx/template_beam.ipynb).\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "TRW5apUw42w1",
        "colab_type": "text"
      },
      "source": [
        "## Step 1. Set up your environment.\n",
        "\n",
        "**Throughout this document, we will present commands twice. Once as a copy-and-paste-ready shell command, once as a jupyter notebook cell. If you are using Colab, just skip shell script block and execute notebook cells.**\n",
        "\n",
        "You should prepare a development environment to build a pipeline.\n",
        "\n",
        "Install `tfx` python package. We recommend use of `virtualenv` in the local environment. You can use following shell script snippet to set up your environment.\n",
        "\n",
        "```sh\n",
        "# Create a virtualenv for tfx.\n",
        "virtualenv -p python3 venv\n",
        "source venv/bin/activate\n",
        "# Install python packages.\n",
        "python -m pip install --user --upgrade tfx==0.22.0\n",
        "```\n",
        "If you are using colab:\n"
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "llKzIjr442w1",
        "colab_type": "code",
        "colab": {}
      },
      "source": [
        "import sys\n",
        "!{sys.executable} -m pip install --user --upgrade -q tfx==0.21.4"
      ],
      "execution_count": 0,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "5NA86S2v42w1",
        "colab_type": "text"
      },
      "source": [
        "NOTE: There might be some errors during package installation. For example,\n",
        "\n",
        ">ERROR: some-package 0.some_version.1 has requirement other-package!=2.0.,&lt;3,&gt;=1.15, but you'll have other-package 2.0.0 which is incompatible.\n",
        "\n",
        "Please ignore these errors at this moment."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "m6-DrWm042w4",
        "colab_type": "code",
        "colab": {}
      },
      "source": [
        "# Set `PATH` to include user python binary directory.\n",
        "HOME=%env HOME\n",
        "PATH=%env PATH\n",
        "%env PATH={PATH}:{HOME}/.local/bin"
      ],
      "execution_count": 0,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "YszwBVuS42w6",
        "colab_type": "text"
      },
      "source": [
        "Let's check the version of TFX.\n",
        "```bash\n",
        "python -c \"import tfx; print('TFX version: {}'.format(tfx.__version__))\"\n",
        "```"
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "sBLyQWYF42w6",
        "colab_type": "code",
        "colab": {}
      },
      "source": [
        "!python3 -c \"import tfx; print('TFX version: {}'.format(tfx.__version__))\""
      ],
      "execution_count": 0,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "ycspntQk42xF",
        "colab_type": "text"
      },
      "source": [
        "And, it's done. We are ready to create a pipeline."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "BoSOcmEB42xF",
        "colab_type": "text"
      },
      "source": [
        "## Step 2. Copy predefined template to your project directory.\n",
        "\n",
        "In this step, we will create a working pipeline project directory and files by copying additional files from a predefined template.\n",
        "\n",
        "You may give your pipeline a different name by changing the `PIPELINE_NAME` below. This will also become the name of the project directory where your files will be put.\n",
        "\n",
        "```bash\n",
        "export PIPELINE_NAME=\"my_pipeline\"\n",
        "export PROJECT_DIR=~/tfx/${PIPELINE_NAME}\n",
        "```"
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "IYGyT4ib42xG",
        "colab_type": "code",
        "colab": {}
      },
      "source": [
        "PIPELINE_NAME=\"my_pipeline\"\n",
        "import os\n",
        "# Create a project directory under Colab content directory.\n",
        "PROJECT_DIR=os.path.join(os.sep,\"content\",PIPELINE_NAME)"
      ],
      "execution_count": 0,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "GjXe_3oY42xI",
        "colab_type": "text"
      },
      "source": [
        "TFX includes the `taxi` template with the TFX python package. If you are planning to solve a point-wise prediction problem, including classification and regresssion, this template could be used as a starting point.\n",
        "\n",
        "The `tfx template copy` CLI command copies predefined template files into your project directory.\n",
        "\n",
        "```sh\n",
        "tfx template copy \\\n",
        "   --pipeline_name=\"${PIPELINE_NAME}\" \\\n",
        "   --destination_path=\"${PROJECT_DIR}\" \\\n",
        "   --model=taxi\n",
        "```"
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "3PmXatBD42xI",
        "colab_type": "code",
        "colab": {}
      },
      "source": [
        "!tfx template copy \\\n",
        "  --pipeline_name={PIPELINE_NAME} \\\n",
        "  --destination_path={PROJECT_DIR} \\\n",
        "  --model=taxi"
      ],
      "execution_count": 0,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "fyhkhhxY42xK",
        "colab_type": "text"
      },
      "source": [
        "Change the working directory context in this notebook to the project directory.\n",
        "\n",
        "```bash\n",
        "cd ${PROJECT_DIR}\n",
        "```"
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "y9e_g5rc42xL",
        "colab_type": "code",
        "colab": {}
      },
      "source": [
        "%cd {PROJECT_DIR}"
      ],
      "execution_count": 0,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "2rKBRbE342xN",
        "colab_type": "text"
      },
      "source": [
        "## Step 3. Browse your copied source files."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "QdiHik_w42xN",
        "colab_type": "text"
      },
      "source": [
        "The TFX template provides basic scaffold files to build a pipeline, including Python source code, sample data, and Jupyter Notebooks to analyse the output of the pipeline. The `taxi` template uses the same *Chicago Taxi* dataset and ML model as the [Airflow Tutorial](https://www.tensorflow.org/tfx/tutorials/tfx/airflow_workshop).\n",
        "\n",
        "In Google Colab, you can browse files by clicking a folder icon on the left. Files should be copied under the project directoy, whose name is `my_pipeline` in this case. You can click directory names to see the content of the directory, and double-click file names to open them.\n",
        "\n",
        "Here is brief introduction to each of the Python files.\n",
        "-   `pipeline` - This directory contains the definition of the pipeline\n",
        "    -   `configs.py` — defines common constants for pipeline runners\n",
        "    -   `pipeline.py` — defines TFX components and a pipeline\n",
        "-   `models` - This directory contains ML model definitions.\n",
        "    -   `features.py`, `features_test.py` — defines features for the model\n",
        "    -   `preprocessing.py`, `preprocessing_test.py` — defines preprocessing\n",
        "        jobs using `tf::Transform`\n",
        "    -   `estimator` - This directory contains an Estimator based model.\n",
        "        -   `constants.py` — defines constants of the model\n",
        "        -   `model.py`, `model_test.py` — defines DNN model using TF estimator\n",
        "    -   `keras` - This directory contains a Keras based model.\n",
        "        -   `constants.py` — defines constants of the model\n",
        "        -   `model.py`, `model_test.py` — defines DNN model using Keras\n",
        "-   `beam_dag_runner.py`, `kubeflow_dag_runner.py` — define runners for each orchestration engine\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "WmWtR_Eq42xQ",
        "colab_type": "text"
      },
      "source": [
        "You might notice that there are some files with `_test.py` in their name. These are unit tests of the pipeline and it is recommended to add more unit tests as you implement your own pipelines.\n",
        "You can run unit tests by supplying the module name of test files with `-m` flag. You can usually get a module name by deleting `.py` extension and replacing `/` with `.`.  For example:\n",
        "\n",
        "```bash\n",
        "python -m models.features_test\n",
        "```"
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "H0DzGg-642xQ",
        "colab_type": "code",
        "colab": {}
      },
      "source": [
        "!{sys.executable} -m models.features_test\n",
        "!{sys.executable} -m models.keras.model_test\n"
      ],
      "execution_count": 0,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "fK_C6C6g42xS",
        "colab_type": "text"
      },
      "source": [
        "## Step 4. Run your first TFX pipeline\n",
        "\n",
        "You can create a pipeline using `pipeline create` command.\n",
        "```bash\n",
        "tfx pipeline create --engine=beam --pipeline_path=beam_dag_runner.py\n",
        "```\n"
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "D5YikNik42xX",
        "colab_type": "code",
        "colab": {}
      },
      "source": [
        "!tfx pipeline create --engine=beam --pipeline_path=beam_dag_runner.py"
      ],
      "execution_count": 0,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "6kvZIn8142xZ",
        "colab_type": "text"
      },
      "source": [
        "Then, you can run the created pipeline using `run create` command.\n",
        "```sh\n",
        "tfx run create --engine=beam --pipeline_name=\"${PIPELINE_NAME}\"\n",
        "```"
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "SnTC_Rql42xZ",
        "colab_type": "code",
        "colab": {}
      },
      "source": [
        "!tfx run create --engine=beam --pipeline_name={PIPELINE_NAME}"
      ],
      "execution_count": 0,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "GM1G1Efw42xb",
        "colab_type": "text"
      },
      "source": [
        "If successful, you'll see `Component CsvExampleGen is finished.` When you copy the template, only one component, CsvExampleGen, is included in the pipeline."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "6pfQcePs42xc",
        "colab_type": "text"
      },
      "source": [
        "## Step 5. Add components for data validation.\n",
        "\n",
        "In this step, you will add components for data validation including `StatisticsGen`, `SchemaGen`, and `ExampleValidator`. If you are interested in data validation, please see [Get started with Tensorflow Data Validation](https://www.tensorflow.org/tfx/data_validation/get_started).\n",
        "\n",
        "We will modify copied pipeline definition in `pipeline/pipeline.py`. If you are working on your local environment, use your favorite editor to edit the file. If you are working on Google Colab, \n",
        "\n",
        ">**Click folder icon on the left to open `Files` view**.\n",
        "\n",
        ">**Click `my_pipeline` to open the directory and click `pipeline` directory to open and double-click `pipeline.py` to open the file**.\n",
        "\n",
        ">Find and uncomment the 3 lines which add `StatisticsGen`, `SchemaGen`, and `ExampleValidator` to the pipeline. (Tip: find comments containing `TODO(step 5):`).\n",
        "\n",
        "> Your change will be saved automatically in a few seconds. Make sure that the `*` mark in front of the `pipeline.py` disappeared in the tab title. **There is no save button or shortcut for the file editor in Colab. Python files in file editor can be saved to the runtime environment even in `playground` mode.**\n",
        "\n",
        "You now need to update the existing pipeline with modified pipeline definition. Use the `tfx pipeline update` command to update your pipeline, followed by the `tfx run create` command to create a new execution run of your updated pipeline.\n",
        "\n",
        "```sh\n",
        "# Update the pipeline\n",
        "tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py\n",
        "# You can run the pipeline the same way.\n",
        "tfx run create --engine beam --pipeline_name \"${PIPELINE_NAME}\"\n",
        "```\n"
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "wMsT-5EX42xc",
        "colab_type": "code",
        "colab": {}
      },
      "source": [
        "# Update the pipeline\n",
        "!tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py\n",
        "# You can run the pipeline the same way.\n",
        "!tfx run create --engine beam --pipeline_name {PIPELINE_NAME}"
      ],
      "execution_count": 0,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "IUU5_3yR42xe",
        "colab_type": "text"
      },
      "source": [
        "You should be able to see the output log from the added components. Our pipeline creates output artifacts in `tfx_pipeline_output/my_pipeline` directory."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "7p7CLDvD42xe",
        "colab_type": "text"
      },
      "source": [
        "## Step 6. Add components for training.\n",
        "\n",
        "In this step, you will add components for training and model validation including `Transform`, `Trainer`, `ResolverNode`, `Evaluator`, and `Pusher`.\n",
        "\n",
        "> **Open `pipeline/pipeline.py`**. Find and uncomment 5 lines which add `Transform`, `Trainer`, `ResolverNode`, `Evaluator` and `Pusher` to the pipeline. (Tip: find `TODO(step 6):`)\n",
        "\n",
        "As you did before, you now need to update the existing pipeline with the modified pipeline definition. The instructions are the same as Step 5. Update the pipeline using `tfx pipeline update`, and create an execution run using `tfx run create`.\n",
        "\n",
        "\n",
        "```sh\n",
        "tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py\n",
        "tfx run create --engine beam --pipeline_name \"${PIPELINE_NAME}\"\n",
        "```"
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "Ik8JbnRq42xf",
        "colab_type": "code",
        "colab": {}
      },
      "source": [
        "!tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py\n",
        "!tfx run create --engine beam --pipeline_name {PIPELINE_NAME}"
      ],
      "execution_count": 0,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "E3L3NEPanUGY",
        "colab_type": "text"
      },
      "source": [
        "When this execution run finishes successfully, you have now created and run your first TFX pipeline using Beam orchestrator!"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "QjcMwjov42xh",
        "colab_type": "text"
      },
      "source": [
        "## Step 7. (*Optional*) Try BigQueryExampleGen.\n",
        "[BigQuery] is a serverless, highly scalable, and cost-effective cloud data warehouse. BigQuery can be used as a source for training examples in TFX. In this step, we will add `BigQueryExampleGen` to the pipeline.\n",
        "\n",
        "You need a [Google Cloud Platform](https://cloud.google.com/gcp/getting-started) account to use BigQuery. Please prepare a GCP project.\n",
        "\n",
        "Login to your project using colab auth library or `gcloud` utility.\n",
        "```sh\n",
        "# You need `gcloud` tool to login in local shell environment.\n",
        "gcloud auth login\n",
        "```"
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "2K7nuHZ4uNXc",
        "colab_type": "code",
        "colab": {}
      },
      "source": [
        "if 'google.colab' in sys.modules:\n",
        "  from google.colab import auth\n",
        "  auth.authenticate_user()\n",
        "  print('Authenticated')\n"
      ],
      "execution_count": 0,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "Win212lr03zP",
        "colab_type": "text"
      },
      "source": [
        "You should specify your GCP project name to access BigQuery resources using TFX. Set `GOOGLE_CLOUD_PROJECT` environment variable to your project name.\n",
        "\n",
        "```sh\n",
        "export GOOGLE_CLOUD_PROJECT=YOUR_PROJECT_NAME_HERE\n",
        "```"
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "Vvpw_lGByxSx",
        "colab_type": "code",
        "colab": {}
      },
      "source": [
        "# Set your project name below.\n",
        "# WARNING! ENTER your project name before running this cell.\n",
        "%env GOOGLE_CLOUD_PROJECT=YOUR_PROJECT_NAME_HERE"
      ],
      "execution_count": 0,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "MhClPWEuuOaP",
        "colab_type": "text"
      },
      "source": [
        "> **Open `pipeline/pipeline.py`**. Comment out `CsvExampleGen` and uncomment the line which create an instance of `BigQueryExampleGen`. You also need to uncomment `query` argument of the `create_pipeline` function.\n",
        "\n",
        "We need to specify which GCP project to use for BigQuery again, and this is done by setting `--project` in `beam_pipeline_args` when creating a pipeline.\n",
        "\n",
        "> **Open `pipeline/configs.py`**. Uncomment the definition of `BIG_QUERY__WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS` and `BIG_QUERY_QUERY`. You should replace the project id and the region value in this file with the correct values for your GCP project.\n",
        "\n",
        "> **Open `beam_dag_runner.py`**. Uncomment two arguments, `query` and `beam_pipeline_args`, for create_pipeline() method.\n",
        "\n",
        "Now the pipeline is ready to use BigQuery as an example source. Update the pipeline and create a run as we did in step 5 and 6."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "w8rOdC3r42xi",
        "colab_type": "code",
        "colab": {}
      },
      "source": [
        "!tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py\n",
        "!tfx run create --engine beam --pipeline_name {PIPELINE_NAME}"
      ],
      "execution_count": 0,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "wxYxNHLN42xo",
        "colab_type": "text"
      },
      "source": [
        "## What's next: Ingest YOUR data to the pipeline.\n",
        "\n",
        "We made a pipeline for a model using the Chicago Taxi dataset. Now it's time to put your data into the pipeline.\n",
        "\n",
        "Your data can be stored anywhere your pipeline can access, including GCS, or BigQuery. You will need to modify the pipeline definition to access your data.\n",
        "\n",
        "1. If your data is stored in files, modify the `DATA_PATH` in `kubeflow_dag_runner.py` or `beam_dag_runner.py` and set it to the location of your files. If your data is stored in BigQuery, modify `BIG_QUERY_QUERY` in `pipeline/configs.py` to correctly query for your data.\n",
        "1. Add features in `models/features.py`.\n",
        "1. Modify `models/preprocessing.py` to [transform input data for training](https://www.tensorflow.org/tfx/guide/transform).\n",
        "1. Modify `models/keras/model.py` and `models/keras/constants.py` to [describe your ML model](https://www.tensorflow.org/tfx/guide/trainer).\n",
        "  - You can use an estimator based model, too. Change `RUN_FN` constant to `models.estimator.model.run_fn` in `pipeline/configs.py`.\n",
        "\n",
        "Please see [Trainer component guide](https://www.tensorflow.org/tfx/guide/trainer) for more introduction."
      ]
    }
  ]
}
